Skip to content

Support temporal bucketing in Union and Reduce operators#36648

Draft
antiguru wants to merge 7 commits into
mainfrom
claude/fix-clu-86-SWx5I
Draft

Support temporal bucketing in Union and Reduce operators#36648
antiguru wants to merge 7 commits into
mainfrom
claude/fix-clu-86-SWx5I

Conversation

@antiguru
Copy link
Copy Markdown
Member

Motivation

This change extends temporal bucketing support to Union and Reduce operators, allowing future-stamped updates (e.g., from mz_now() MFPs) to be delayed until their bucket boundary releases them. This reduces memory pressure by preventing such updates from accumulating in consolidation batchers until the input frontier catches up.

Description

The changes introduce an ArrangementStrategy field to both Union and Reduce plan nodes, which the renderer consults to decide whether to apply temporal bucketing:

For Union operators:

  • Added input_strategies: Vec<ArrangementStrategy> field, aligned with inputs
  • When consolidate_output is true and temporal bucketing is enabled, the renderer applies bucketing to inputs marked with TemporalBucketing strategy before concatenation
  • Bucketing is only meaningful when consolidation follows, so it's gated on consolidate_output

For Reduce operators:

  • Added input_strategy: ArrangementStrategy field
  • When the reduce performs pre-aggregation consolidation (monotonic hierarchical reductions with must_consolidate set), the renderer applies bucketing before the consolidate if the strategy indicates it
  • This prevents future-stamped updates from piling up in the KeyBatcher

Supporting changes:

  • Made MaybeBucketByTime::maybe_apply_temporal_bucketing generic over the data type D (previously hardcoded to Row), allowing it to work with different collection types
  • Updated the lowering logic to populate input_strategies based on whether inputs have future updates via strategy_from_future()
  • Updated all plan traversal code (explain, interpret, render_plan) to handle the new fields

The implementation respects the ENABLE_COMPUTE_TEMPORAL_BUCKETING dynamic config and uses TEMPORAL_BUCKETING_SUMMARY to determine bucket boundaries.

Verification

The changes are primarily structural additions to the plan representation and rendering logic. Existing tests should continue to pass as the new fields are properly threaded through all plan traversal code. The temporal bucketing behavior itself is gated behind the ENABLE_COMPUTE_TEMPORAL_BUCKETING config, so it won't affect default behavior until explicitly enabled.

https://claude.ai/code/session_01XsGDMKZricZbyiB67npsNG

claude added 7 commits May 18, 2026 11:23
The `Union { consolidate_output: true }` arm previously fed the
concatenated stream directly into `consolidate_named::<KeyBatcher>`.
Future-dated updates therefore accumulated in the consolidate operator
until the input frontier caught up — exactly the situation
`BucketChain` was introduced to avoid in the `ArrangeBy` lowering.

Apply `MaybeBucketByTime::maybe_apply_temporal_bucketing` to the
concatenated stream before the consolidate, gated on
`ENABLE_COMPUTE_TEMPORAL_BUCKETING`. The trait is a no-op for partially
ordered timestamps (e.g. inside iterative scopes), so this only does
real work in non-iterative scopes where `BucketChain` is meaningful.

Fixes CLU-86.
The `Union { consolidate_output: true }` arm previously fed the
concatenated stream directly into `consolidate_named::<KeyBatcher>`.
Future-dated updates therefore accumulated in the consolidate operator
until the input frontier caught up — the situation `BucketChain` was
introduced to avoid in the `ArrangeBy` lowering.

Track `has_future_updates` per Union input through lowering and surface
it as `input_has_future_updates: Vec<bool>` on `PlanNode::Union` (and
the corresponding `RenderPlan` variant). The renderer applies
`MaybeBucketByTime::maybe_apply_temporal_bucketing` only to those
specific inputs that may carry future updates, and only when
`consolidate_output` is set and `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is
on. Inputs that the lowering knows cannot carry future-stamped updates
pay no bucketing cost.

Fixes CLU-86.
The previous commit attached a `Vec<bool>` "has future updates" flag to
`PlanNode::Union`. That conflates an input property with a rendering
decision and forces the renderer to translate "is future" into "should
bucket" — the same translation the lowering already does for
`ArrangeBy`.

Reuse `ArrangementStrategy` per Union input. The lowering runs each
input's `has_future_updates` through `strategy_from_future`, so the
plan carries `Direct` / `TemporalBucketing` — what the renderer should
do, not what is true of the input. The renderer simply matches on the
strategy, mirroring `ArrangeBy`. `ArrangementStrategy`'s docstring is
broadened to cover both consumers.
`build_monotonic` (`render/reduce.rs:1164`) feeds its
`consolidate_named_if::<KeyBatcher>` without temporal bucketing.
Future-stamped updates (e.g., from a temporal MFP feeding into a
monotonic hierarchical reduction with `must_consolidate=true`)
therefore accumulate in the batcher until the input frontier catches
up — the same gap CLU-86 fixes for Union.

Carry the rendering decision on the LIR `Reduce` node as
`input_strategy: ArrangementStrategy`, mirroring `ArrangeBy::strategy`
and the new Union `input_strategies`. The lowering sets it via
`strategy_from_future(input_future)`. The renderer threads it through
`render_reduce` → `render_reduce_plan` → `render_reduce_plan_inner`
and `build_monotonic` applies `MaybeBucketByTime` ahead of the
consolidate when the strategy is `TemporalBucketing`,
`must_consolidate` is set, and `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is
on.

Generalise `MaybeBucketByTime::maybe_apply_temporal_bucketing` over
the data type so the monotonic reduce can bucket its
`(Row, Vec<Row>)` stream; existing `Row` callers (Union, ArrangeBy)
keep working via type inference.
`MzData + Data` alone is insufficient — the inner `bucket` impl also
requires `timely::ExchangeData` (for the `Exchange` PACT) and
`Hashable` (for `d.hashed()`). Use `MzData + ExchangeData + Hashable`,
which folds `Ord + Clone + Debug + 'static` into
`differential_dataflow::ExchangeData`.
`build_monotonic` in reduce and the two `consolidate_named_if` sites in
top-k all sit in front of `KeyBatcher` consolidates that fire on the
single-time refinement path (`refine_single_time_operator_selection`).
That path upgrades any `Basic`/`Bucketed` to a monotonic variant with
`must_consolidate=true`, including plans whose MIR Filters carry
temporal predicates. Future-stamped updates therefore pile up in the
batcher until the input frontier catches up — the same gap as Union
and the previous Reduce fix.

Add `input_strategy: ArrangementStrategy` to `PlanNode::TopK`,
populated by the lowering via `strategy_from_future`. Thread it into
`render_topk` and `render_top1_monotonic`.

Replace the `consolidate_named_if(must_consolidate, name)` calls in
`build_monotonic`, the `MonotonicTopK` arm of `render_topk`, and
`render_top1_monotonic` with explicit `if must_consolidate { ... }`.
That removes the bool-passing API at the call site and gives the
bucketing a natural place to live inside the same branch.

Share the bucketing logic via a new
`Context::bucket_for_consolidate` helper, used now by Union, Reduce,
and both TopK paths.
@ggevay
Copy link
Copy Markdown
Contributor

ggevay commented May 21, 2026

Some comments:

1. Reduce coverage is narrower in #36648 than in #36644

render_reduce_plan dispatches over six variants. Each one builds its own batcher-backed arrangement(s):

Variant How it arranges Bucketed by #36648? Bucketed by #36644?
Reduce::Hierarchical::Monotonic consolidate_named_if (KeyBatcher) yes (in build_monotonic) yes
Reduce::Hierarchical::Bucketed per-bucket mz_arrange no yes
Reduce::Accumulable mz_arrange in build_accumulable no yes
Reduce::Basic::Single / ::Multiple mz_arrange in build_basic_aggregate no yes
Reduce::Distinct mz_arrange no yes

#36644 hooks the bucket call at the top of render_reduce itself, on key_val_input.as_collection(). The bucketed key_val_collection then flows into every variant via render_reduce_plan — one bucket op per Reduce LIR node, all six variants covered.

This matters for the user's repro, which contains — on top of temporal-filter-bearing inputs — a non-monotonic MAX(...) with retractions (→ Reduce::Hierarchical::Bucketed), a window function (lead()Reduce::Basic), and Accumulable aggregates elsewhere in the pipeline.

If #36648 lands and #36644 is dropped, none of those Reduce sites get bucketing. Cleanest fix: pull the top-of-render_reduce site from #36644 into #36648 and drop the inside-build_monotonic one (the upstream site dominates — by the time data reaches consolidate_named_if, the future-stamped updates have already been held back by the bucket op). Alternative if you'd rather stay site-by-site: add bucket_for_consolidate in build_basic_aggregate, build_accumulable, build_bucketed_negated_output, and build_distinct too.

2. TopK::Basic (non-monotonic) is uncovered

The LIR field PlanNode::TopK::input_strategy is already plumbed by your PR, but only the MonotonicTopKPlan and MonotonicTop1Plan arms of render_topk consult it. BasicTopKPlan (the LIMIT n path with n > 1) goes through build_topkbuild_topk_stagebuild_topk_negated_stage (top_k.rs L514), which builds its primary arrangement via batcher-backed input.mz_arrange::<…, RowRowBatcher, …>(…). Same future-update pathology, same fix shape.

This is a pre-existing gap (fec2af8-era), not something your PR introduces, but trivial to close while you're in the area: either consult input_strategy in the Basic arm too, or hoist bucket_for_consolidate once above the TopKPlan match.

Triggering shape: SELECT … FROM t WHERE mz_now() < … ORDER BY … LIMIT n with n > 1.

@ggevay
Copy link
Copy Markdown
Contributor

ggevay commented May 22, 2026

Some more comments, pointing out some differences between #36644 and this PR, in case we want to move forward with this PR.

3. Threshold is not covered

MirRelationExpr::Threshold lowering in #36648 doesn't consult or propagate
the future-updates flag, and render_threshold doesn't bucket. #36644 clears
has_future_updates on Threshold when its conditional new_arranged wrap
fires, as defense-in-depth. (We verified this case is currently unreachable
from any SQL surface because EXCEPT inserts Distinct per leg and
EXCEPT ALL is absorbed by the consolidating-Union per-leg bucketing — see
temporal_bucketing.slt — but having the flag-clear in place protects against
future MIR producers of Threshold.)

4. The Union bucketing decision is split between LIR and render

#36644 folds refine_union_negate_consolidation's consolidate_output = true
decision into the Union arm of lowering, so the per-input
temporal_bucketing_strategies and consolidate_output are decided in the
same place from the same predicate. #36648 keeps the post-pass and has the
render arm combine input_strategies with consolidate_output to decide
whether a bucket op fires per leg.

This is correct, but architecturally unfortunate:

  • EXPLAIN PHYSICAL PLAN's
    input_strategies no longer reflects "where bucketing actually fires" —
    render does the final combination, so EXPLAIN won't show the effect of any
    future change in how the two fields are combined;
  • it introduces an
    optimization decision at render time, which we'd like to generally avoid.

Easiest near-term fix: fold the predicate into lowering as #36644 does,
deciding both fields together. A longer-term cleanup would be to keep
refine_union_negate_consolidation as a post-pass (post-pass adjustment of
consolidate_output is fine on its own) and add a sibling LIR post-pass that
recomputes the per-input bucketing strategies after the consolidation
decision is finalized — so the bucketing decision is also fully settled in
LIR before render runs.

5. ArrangeBy raw-only wraps

#36644 has a precursor commit that passes false instead of the upstream
future flag at the three synthetic arrange_by(..., new_raw(), ...) sites in
lowering (TopK permute, Negate permute, Union per-input permute),
because a raw-only ArrangeBy builds no arrangement and ensure_collections
forces Direct for raw-only paths anyway. Without that, the LIR records
strategy = TemporalBucketing on ArrangeBy nodes that physically cannot
bucket — misleading in EXPLAIN and a footgun for any future flag-propagation
analysis that reads the LIR. #36648 does not have this fix.

6. MonotonicTop1/K { must_consolidate: false } overhead

#36648 gates the bucket call on must_consolidate (good — avoids the overhead
of a bucket op with no downstream consolidator). #36644 takes a slightly
different angle: it argues (and documents with a soft_assert_or_log! at the
top of render_topk) that the combination "MonotonicTop1/MonotonicTopK
with must_consolidate = false together with TemporalBucketing" is
structurally impossible, because RelaxMustConsolidate (the only writer of
must_consolidate = false) runs only on single-time dataflows, and in
single-time dataflows ExprPrepOneShot constant-folds mz_now() to the
dataflow as_of before lowering — so has_future_updates is false
everywhere and lowering never picks TemporalBucketing to begin with.

Both PRs are functionally correct on this point, but #36648's
runtime gate vs. #36644's invariant + assert are different choices. #36648's
runtime gate is a rendering-time optimization decision, which we'd like to
generally avoid.

7. Default-format EXPLAIN

#36644 adds a Temporally-Bucketed prefix to the default-format physical-plan
printer for Reduce, TopK, and Union, mirroring the existing
Consolidating treatment. #36648 only updates the verbose-text printer.
Nice-to-have for debuggability of the feature.

8. Test coverage

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants